package com.dahuan.tables;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Table_Example {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism( 1 );

        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        DataStreamSource<String> inputSource = env.readTextFile( path );

        SingleOutputStreamOperator<SensorReading> dataStream = inputSource.map( line -> {
            String[] split = line.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );

        //创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        //基于流创建一张表
        Table dataTable = tableEnv.fromDataStream( dataStream );

        //调用tableAPI进行转换操作
        Table resultTable = dataTable.select( "id,temperature" )
                .where( "id = 'sensor_1'" );

        //调用SQL
        //执行sql
        //注册临时表
        tableEnv.createTemporaryView( "sensor", dataStream);
        String sql = "select id,temperature from sensor where id = 'sensor_1'";
        Table resultSqlTable = tableEnv.sqlQuery( sql );

        //打印输出
        tableEnv.toAppendStream( resultTable, Row.class ).print("TableAPI");
        System.out.println("------------------------------------------------");
        tableEnv.toAppendStream( resultSqlTable, Row.class ).print("SQL");


        env.execute( "Table_Example" );
    }
}
